Custom Task Queues
By default, Piscina uses a simple array-based first-in-first-out (fifo) task queue. When a new task is submitted and there are no available workers, tasks are pushed on to the queue until a worker becomes available.
If the default fifo queue is not sufficient, user code may replace the
task queue implementation with a custom implementation using the
taskQueue
option on the Piscina constructor.
Custom task queue objects must implement the TaskQueue
interface,
described below using TypeScript syntax:
interface Task {
readonly [Piscina.queueOptionsSymbol]: object | null;
}
interface TaskQueue {
readonly size: number;
shift(): Task | null;
remove(task: Task): void;
push(task: Task): void;
}
An example of a custom task queue that uses a shuffled priority queue
is available in examples/task-queue
;
The special symbol Piscina.queueOptionsSymbol
may be set as a property
on tasks submitted to run()
or runTask()
as a way of passing additional
options on to the custom TaskQueue
implementation. (Note that because the
queue options are set as a property on the task, tasks with queue
options cannot be submitted as JavaScript primitives).
Built-In Queues
Piscina also provides the FixedQueue
, a more performant task queue implementation based on FixedQueue
from Node.js project.
Here are some benchmarks to compare new FixedQueue
with ArrayTaskQueue
(current default). The benchmarks demonstrate substantial improvements in push and shift operations, especially with larger queue sizes.
Queue size = 1000
(index) | Task Name | ops/sec | Average Time (ns) | Margin | Samples |
---|---|---|---|---|---|
0 | 'ArrayTaskQueue full push + full shift' | '9 692' | 103175.15463917515 | '±0.80%' | 970 |
1 | 'FixedQueue full push + full shift' | '131 879' | 7582.696390658352 | '±1.81%' | 13188 |
Queue size = 100,000
(index) | Task Name | ops/sec | Average Time (ns) | Margin | Samples |
---|---|---|---|---|---|
0 | 'ArrayTaskQueue full push + full shift' | '0' | 1162376920.0000002 | '±1.77%' | 10 |
1 | 'FixedQueue full push + full shift' | '1 026' | 974328.1553396407 | '±2.51%' | 103 |
In terms of Piscina performance itself, using FixedQueue
with a queue size of 100,000 queued tasks can result in up to 6 times faster execution times.
Users can import FixedQueue
from the Piscina
package and pass it as the taskQueue
option to leverage its benefits.
Using FixedQueue
Here's an example of how to use the FixedQueue
:
- Javascript
- Typescript
const { Piscina, FixedQueue } = require("piscina");
const { resolve } = require("path");
// Create a Piscina pool with FixedQueue
const piscina = new Piscina({
filename: resolve(__dirname, "worker.js"),
taskQueue: new FixedQueue(),
});
// Submit tasks to the pool
for (let i = 0; i < 10; i++) {
piscina
.runTask({ data: i })
.then((result) => {
console.log(result);
})
.catch((error) => {
console.error(error);
});
}
import Piscina from "piscina";
import { resolve } from "path";
import { filename } from "./worker";
const piscina = new Piscina({
filename: resolve(__dirname, "./workerWrapper.js"),
workerData: { fullpath: filename },
});
// Submit tasks to the pool
for (let i = 0; i < 10; i++) {
piscina
.runTask({ data: i })
.then((result) => {
console.log(result);
})
.catch((error) => {
console.error(error);
});
}
const { workerData } = require('worker_threads');
if (workerData.fullpath.endsWith(".ts")) {
require("ts-node").register();
}
module.exports = require(workerData.fullpath);
The FixedQueue
will become the default task queue implementation in a next major version.